{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Copyright 2020 NVIDIA Corporation. All Rights Reserved.\n",
    "#\n",
    "# Licensed under the Apache License, Version 2.0 (the \"License\");\n",
    "# you may not use this file except in compliance with the License.\n",
    "# You may obtain a copy of the License at\n",
    "#\n",
    "#     http://www.apache.org/licenses/LICENSE-2.0\n",
    "#\n",
    "# Unless required by applicable law or agreed to in writing, software\n",
    "# distributed under the License is distributed on an \"AS IS\" BASIS,\n",
    "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n",
    "# See the License for the specific language governing permissions and\n",
    "# limitations under the License.\n",
    "# =============================================================================="
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Conversion Script for Criteo Dataset (CSV-to-Parquet) \n",
    "\n",
    "__Step 1__: Import libraries"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "import os\n",
    "import rmm\n",
    "import cudf\n",
    "from cudf.io.parquet import ParquetWriter\n",
    "from fsspec.core import get_fs_token_paths\n",
    "import numpy as np\n",
    "import pyarrow.parquet as pq\n",
    "\n",
    "from dask.dataframe.io.parquet.utils import _analyze_paths\n",
    "from dask.base import tokenize\n",
    "from dask.utils import natural_sort_key\n",
    "from dask.highlevelgraph import HighLevelGraph\n",
    "from dask.delayed import Delayed\n",
    "from dask.distributed import Client\n",
    "from dask_cuda import LocalCUDACluster\n",
    "\n",
    "import nvtabular as nvt\n",
    "from nvtabular.utils import device_mem_size"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "__Step 2__: Specify options\n",
    "\n",
    "Specify the input and output paths, unless the `INPUT_DATA_DIR` and `OUTPUT_DATA_DIR` environment variables are already set. In order to utilize a multi-GPU system, be sure to specify `allow_multi_gpu=True` (and check the setting of your `CUDA_VISIBLE_DEVICES` environment variable)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [],
   "source": [
    "INPUT_PATH = os.environ.get('INPUT_DATA_DIR', '/datasets/criteo/crit_orig')\n",
    "OUTPUT_PATH = os.environ.get('OUTPUT_DATA_DIR', '/raid/criteo/tests/demo_out')\n",
    "CUDA_VISIBLE_DEVICES = os.environ.get(\"CUDA_VISIBLE_DEVICES\", \"0\")\n",
    "n_workers = len(CUDA_VISIBLE_DEVICES.split(\",\"))\n",
    "frac_size = 0.15\n",
    "allow_multi_gpu = False\n",
    "use_rmm_pool = False\n",
    "max_day = None  # (Optional) -- Limit the dataset to day 0-max_day for debugging"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "__Step 3__: Define helper/task functions"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [],
   "source": [
    "def _pool(frac=0.8):\n",
    "    rmm.reinitialize(\n",
    "        pool_allocator=True,\n",
    "        initial_pool_size=frac * device_mem_size(),\n",
    "    )\n",
    "    \n",
    "def _convert_file(path, name, out_dir, frac_size, fs, cols, dtypes):\n",
    "    fn = f\"{name}.parquet\"\n",
    "    out_path = fs.sep.join([out_dir, f\"{name}.parquet\"])\n",
    "    writer = ParquetWriter(out_path, compression=None)\n",
    "    for gdf in nvt.Dataset(\n",
    "        path,\n",
    "        engine=\"csv\",\n",
    "        names=cols,\n",
    "        part_memory_fraction=frac_size,\n",
    "        sep='\\t',\n",
    "        dtypes=dtypes,\n",
    "    ).to_iter():\n",
    "        writer.write_table(gdf)\n",
    "        del gdf\n",
    "    md = writer.close(metadata_file_path=fn)\n",
    "    return md\n",
    "\n",
    "def _write_metadata(md_list, fs, path):\n",
    "    rg_sizes = []\n",
    "    if md_list:\n",
    "        metadata_path = fs.sep.join([path, \"_metadata\"])\n",
    "        _meta = (\n",
    "            cudf.io.merge_parquet_filemetadata(md_list)\n",
    "            if len(md_list) > 1\n",
    "            else md_list[0]\n",
    "        )\n",
    "        with fs.open(metadata_path, \"wb\") as fil:\n",
    "            _meta.tofile(fil)\n",
    "    return True"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "__Step 4__: (Optionally) Start a Dask cluster"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Start up cluster if we have multiple devices\n",
    "# (and `allow_multi_gpu == True`)\n",
    "client = None\n",
    "if n_workers > 1 and allow_multi_gpu:\n",
    "    cluster = LocalCUDACluster(\n",
    "        n_workers=n_workers,\n",
    "        CUDA_VISIBLE_DEVICES=CUDA_VISIBLE_DEVICES,\n",
    "    )\n",
    "    client = Client(cluster)\n",
    "    if use_rmm_pool:\n",
    "        client.run(_pool)\n",
    "elif use_rmm_pool:\n",
    "    _pool()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "__Step 5__: Main conversion script (build Dask task graph)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [],
   "source": [
    "fs = get_fs_token_paths(INPUT_PATH, mode=\"rb\")[0]\n",
    "file_list = [\n",
    "    x for x in fs.glob(fs.sep.join([INPUT_PATH, \"day_*\"]))\n",
    "    if not x.endswith(\"parquet\")\n",
    "]\n",
    "file_list = sorted(file_list, key=natural_sort_key)\n",
    "file_list = file_list[:max_day] if max_day else file_list\n",
    "name_list = _analyze_paths(file_list, fs)[1]\n",
    "\n",
    "cont_names = [\"I\" + str(x) for x in range(1, 14)]\n",
    "cat_names = [\"C\" + str(x) for x in range(1, 27)]\n",
    "cols = [\"label\"] + cont_names + cat_names\n",
    "\n",
    "dtypes = {}\n",
    "dtypes[\"label\"] = np.int32\n",
    "for x in cont_names:\n",
    "    dtypes[x] = np.int32\n",
    "for x in cat_names:\n",
    "    dtypes[x] = \"hex\"\n",
    "\n",
    "dsk = {}\n",
    "token = tokenize(file_list, name_list, OUTPUT_PATH, frac_size, fs, cols, dtypes)\n",
    "convert_file_name = \"convert_file-\" + token\n",
    "for i, (path, name) in enumerate(zip(file_list, name_list)):\n",
    "    key = (convert_file_name, i)\n",
    "    dsk[key] = (_convert_file, path, name, OUTPUT_PATH, frac_size, fs, cols, dtypes)\n",
    "\n",
    "write_meta_name = \"write-metadata-\" + token\n",
    "dsk[write_meta_name] = (\n",
    "    _write_metadata,\n",
    "    [(convert_file_name, i) for i in range(len(file_list))],\n",
    "    fs,\n",
    "    OUTPUT_PATH,\n",
    ")\n",
    "graph = HighLevelGraph.from_collections(write_meta_name, dsk, dependencies=[])\n",
    "conversion_delayed = Delayed(write_meta_name, graph)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**__Step 6__**: Execute conversion"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "CPU times: user 8min 48s, sys: 12min 53s, total: 21min 41s\n",
      "Wall time: 30min 51s\n"
     ]
    }
   ],
   "source": [
    "%%time\n",
    "if client:\n",
    "    conversion_delayed.compute()\n",
    "else:\n",
    "    conversion_delayed.compute(scheduler=\"synchronous\")"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.8"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
